{
  "metadata" : {
    "id" : "2f2a280f-81ed-4e24-b17c-a081f920b660",
    "name" : "kafka-sensor-data-generator",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "sparkNotebook" : null,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : [ "org.apache.spark %% spark-sql % 2.3.0", "org.apache.spark %% spark-sql-kafka-0-10 % 2.3.0" ],
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null,
    "customVars" : null
  },
  "cells" : [ {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "30BAF937A31247F095E8F815E0A130EA"
    },
    "cell_type" : "markdown",
    "source" : "# Sensor Data Generator\nThis notebook serves as a sensor data simulator.\nIt generates a stream of random sensor readings for a given number of sensors.\n\nThe data is produced to a configurable Kafka topic."
  }, {
    "metadata" : {
      "id" : "BE3667BF92C2474C968FB663897C2945"
    },
    "cell_type" : "markdown",
    "source" : "### Configuration"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "D5EADC5B952546EB89BABA94FEA6AC85"
    },
    "cell_type" : "code",
    "source" : [ "// Kafka\n", "val kafkaBootstrapServer = \"172.17.0.2:9092\"\n", "val targetTopic = \"iot-data\"\n", "\n", "// File system\n", "val workDir = \"/tmp/streaming-with-spark\"\n", "\n", "// Generator\n", "val sensorCount = 100000" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "CF7FDE26F16B476D88694FC731121CD1"
    },
    "cell_type" : "markdown",
    "source" : "## Schema\nWe need a schema definition for the sensor data that we are going to generate."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "E9D6009A13594B1294C2C55F63E923B1"
    },
    "cell_type" : "code",
    "source" : [ "case class SensorData(sensorId: Int, timestamp: Long, value: Double)\n", "object SensorData {\n", "  import scala.util.Random\n", "  def randomGen(maxId:Int) = {\n", "    SensorData(Random.nextInt(maxId), System.currentTimeMillis, Random.nextDouble())\n", "  }\n", "}" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "D432E889A5C64C1FB722CDAFBD6464FF"
    },
    "cell_type" : "code",
    "source" : [ "case class Rate(timestamp: Long, value: Long)" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "C13759524C1740209059997D9D1FE4C6"
    },
    "cell_type" : "markdown",
    "source" : "## We use the built-in rate generator as the base stream for our data generator"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "BCC7E1DD775A48CF80B51C96737A43B2"
    },
    "cell_type" : "code",
    "source" : [ "val baseStream = sparkSession.readStream.format(\"rate\").option(\"recordsPerSecond\", 100).load()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "66DC058C3C904140836593CB97F088DE"
    },
    "cell_type" : "code",
    "source" : [ "val sensorValues = baseStream.as[Rate].map(_ => SensorData.randomGen(sensorCount))" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "AEF94D7C908641D681E7F21F22FA06F3"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.sql.kafka010._" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "918CD1A8190D4BF6A9A23826E06B2933"
    },
    "cell_type" : "code",
    "source" : [ "val query = sensorValues.writeStream.format(\"kafka\")\n", "  .queryName(\"kafkaWriter\")\n", "  .outputMode(\"append\")\n", "  .option(\"kafka.bootstrap.servers\", kafkaBootstrapServer) // comma-separated list of host:port\n", "  .option(\"topic\", targetTopic)\n", "  .option(\"checkpointLocation\", workDir+\"/generator-checkpoint\")\n", "  .option(\"failOnDataLoss\", \"false\") // use this option when testing\n", "  .start()\n", "\n" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "C7B26D61719145BF9D52A5B7605ADAAB"
    },
    "cell_type" : "code",
    "source" : [ "" ],
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}